#include "config.h"

#include <sys/statvfs.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/statfs.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "cluster.h"
#include "squeue.h"
#include "cache.h"
#include "core.h"
#include "conn.h"
#include "lich_md.h"
#include "../replica/diskmd/disk/disk.h"
#include "../replica/replica.h"
#include "../controller/md_proto.h"
#include "chunk_proto.h"
#include "chunk_bh.h"
#include "../../cluster/dispatch/dispatch.h"
#include "chunk_ops.h"
#include "chunk.h"
#include "chunk_cleanup.h"
#include "longtask.h"
#include "job_dock.h"
#include "dbg.h"

typedef struct {
        char *pool;
        fileid_t parent;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        vfm_t *vfm;
        lease_token_t token;
        int oflags;
} args_t;

STATIC int __chunk_recovery_new(args_t *args, int _from, int _to,
                            const clockstat_t *clockstat);

static inline void args_init(args_t *args, const char *pool, const fileid_t *parent,
                             chkinfo_t *chkinfo, chkstat_t *chkstat, vfm_t *vfm, const lease_token_t *token)
{

        args->pool = (char *)pool;
        args->parent = *parent;
        args->chkinfo = chkinfo;
        args->chkstat = chkstat;
        args->vfm = vfm;
        args->token = *token;
        args->oflags = 0;
}

static void __chunk_bh_sync(const chkid_t *parent, const chkinfo_t *chkinfo, const vfm_t *vfm, int priority)
{
#if 1
        int ret, online, total;

        (void) vfm;

        if (chkinfo->id.type == __RAW_CHUNK__)
                return;

        ret = conn_faultdomain(&total, &online);
        if (unlikely(ret)) {
                DWARN("get fault domain fail\n");
                return;
        }

        YASSERT(online <= total);

        if (online < chkinfo->repnum) {
                DBUG(CHKID_FORMAT" sync, fault domain online %u need %u, skip\n", CHKID_ARG(&chkinfo->id), online, chkinfo->repnum);
                return;
        }

        chunk_bh_sync(parent, &chkinfo->id, priority);
#else
        int i, clean = 0;
        const reploc_t *reploc;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];

                if (vfm) {
                        if (reploc->status == __S_CLEAN && !vfm_exist(vfm, &reploc->id)) {
                                clean++;
                        }
                } else {
                        if (reploc->status == __S_CLEAN) {
                                clean++;
                        }
                }
        }

        if (clean < 2) {
                DBUG(""CHKID_FORMAT" clean %u need sync immediately\n", CHKID_ARG(&chkinfo->id), clean);
                chunk_bh_sync(parent, &chkinfo->id, priority);
        }
#endif
}

static int __chunk_recovery_old__(args_t *args, int _from, int _to, const clockstat_t *clockstat)
{
        int ret;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;
        const vclock_t *vclock = &clockstat->vclock;
        chkstat_t *chkstat = args->chkstat;
        clockstat_t clockstat2;
        repstat_t repstat;

        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        args->oflags = 1;

        ret = chunk_push(args->pool, &args->chkinfo->id, from, to, vclock, args->chkinfo->info_version,
                         &args->parent, -1, 0, "old");
        if (unlikely(ret)) {
                if (ret == ENOSPC) {
                        DERROR("recovery "CHKID_FORMAT" to %s, force restart\n",
                               CHKID_ARG(&chkinfo->id), network_rname(to));
                        EXIT(EAGAIN);
                } else
                        GOTO(err_ret, ret);
        } else {
                ret = chunk_connect(to, &chkinfo->id, &args->parent, 0, &args->token, &clockstat2, &repstat);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                chkstat->repstat[_to] = repstat;
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_recovery_old(args_t *args, int _from, int _to, const clockstat_t *clockstat)
{
        if (args->chkinfo->id.type == __VOLUME_CHUNK__
            || args->chkinfo->id.type == __POOL_CHUNK__) {
                return __chunk_recovery_old__(args, _from, _to, clockstat);
        } else {//for balance
                return __chunk_recovery_new(args, _from, _to, clockstat);
        }
}

static int __chunk_push_newdisk(args_t *args, int _from, int _to, const vclock_t *vclock)
{
        return chunk_push_newdisk(args->pool, args->chkinfo, _from, _to, vclock, &args->parent,
                                  -1, 0, &args->oflags);
}

STATIC int __chunk_online(const diskid_t *diskid, time_t *ltime)
{
        int ret;

        ret =  network_connect(diskid, ltime, 1, 0);
        if (unlikely(ret))
                return 0;
        else
                return 1;
}

STATIC int __chunk_equal(const chkid_t *chkid,
                         const nid_t *nid1, const clockstat_t *clockstat1,
                         const nid_t *nid2, const clockstat_t *clockstat2)
{
        if ((clockstat1->lost && clockstat2->lost)
            && gloconf.clock_unsafe == 0) {
#if ENABLE_CHUNK_DEBUG
                DINFO("check "CHKID_FORMAT" vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                      CHKID_ARG(chkid),
                      network_rname(nid1), clockstat1->vclock.clock, clockstat1->vclock.vfm, clockstat1->dirty, clockstat1->lost,
                      network_rname(nid2), clockstat2->vclock.clock, clockstat2->vclock.vfm, clockstat2->dirty, clockstat2->lost);
#else
                DBUG("check "CHKID_FORMAT" vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                     CHKID_ARG(chkid),
                     network_rname(nid1), clockstat1->vclock.clock, clockstat1->vclock.vfm, clockstat1->dirty, clockstat1->lost,
                     network_rname(nid2), clockstat2->vclock.clock, clockstat2->vclock.vfm, clockstat2->dirty, clockstat2->lost);
#endif
                return 0;
        }

        if ((clockstat1->dirty || clockstat2->dirty)
            || (clockstat1->lost != clockstat2->lost)
            || (clockstat1->vclock.clock != clockstat2->vclock.clock)
            || (clockstat1->vclock.vfm != clockstat2->vclock.vfm)) {

#if ENABLE_CHUNK_DEBUG
                DINFO("check "CHKID_FORMAT" vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                      CHKID_ARG(chkid),
                      network_rname(nid1), clockstat1->vclock.clock, clockstat1->vclock.vfm, clockstat1->dirty, clockstat1->lost,
                      network_rname(nid2), clockstat2->vclock.clock, clockstat2->vclock.vfm, clockstat2->dirty, clockstat2->lost);
#else
                DBUG("check "CHKID_FORMAT" vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                      CHKID_ARG(chkid),
                      network_rname(nid1), clockstat1->vclock.clock, clockstat1->vclock.vfm, clockstat1->dirty, clockstat1->lost,
                      network_rname(nid2), clockstat2->vclock.clock, clockstat2->vclock.vfm, clockstat2->dirty, clockstat2->lost);
#endif

                return 0;
        } else
                return 1;
}

static int __chunk_clean_online_usevfm(vfm_t *vfm, const clockstat_t *clockstat,
                                       const nid_t *nid, const chkinfo_t *chkinfo)
{
        if (vfm == NULL)
                return 0;

        if (vfm->clock <= clockstat->vclock.vfm)
                return 0;

        if (clockstat->lost == 0 && vfm_exist(vfm, nid) == 0)
                return 0;

        if (vfm_add_check(vfm, nid, chkinfo) != 0)
                return 0;

        return 1;
}

STATIC int __chunk_clean_disk_offline(args_t *args, int _from, int _to,
                                      const clockstat_t *clockstat, int recover)
{
        int ret;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;
        vfm_t *vfm = args->vfm;
        char tmp[MAX_PATH_LEN];

        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        vfm_dump(vfm, tmp);
        DBUG("force sync "CHKID_FORMAT" vfm %s vclock %s:%ju,%ju,%u,%u\n",
             CHKID_ARG(chkid), tmp,
             network_rname(from), clockstat->vclock.clock,
             clockstat->vclock.vfm, clockstat->dirty, clockstat->lost,
             network_rname(to));

        if (recover) {
                ret = __chunk_recovery_new(args, _from, _to, clockstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("connect "CHKID_FORMAT" success with recover\n", CHKID_ARG(&chkinfo->id));
        } else {
                if (vfm) {
                        if (vfm_add_check(vfm, to, chkinfo) == 0) {
                                DBUG("connect "CHKID_FORMAT" fail, skip with vfm\n", CHKID_ARG(&chkinfo->id));

                                __chunk_bh_sync(&args->parent, chkinfo, vfm, 0);
                                chkstat->repstat[_to].ltime = 0;
                                chkstat->repstat[_to].magic = 0;
                        } else {
                                vfm_dump(vfm, tmp);
                                DINFO("force sync "CHKID_FORMAT" vfm %s vclock %s:%ju,%ju,%u,%u --> %s\n",
                                      CHKID_ARG(chkid), tmp,
                                      network_rname(from), clockstat->vclock.clock,
                                      clockstat->vclock.vfm, clockstat->dirty, clockstat->lost,
                                      network_rname(to));
#if 1
                                ret = __chunk_recovery_old(args, _from, _to, clockstat);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
#else
                                ret = EAGAIN;
                                GOTO(err_ret, ret);
#endif
                        }
                } else {
                        ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        chkstat->repstat[_to].ltime = 0;
                        chkstat->repstat[_to].magic = 0;

                        __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_clean_online(args_t *args, int _from, int _to,
                                const clockstat_t *clockstat, int recover)
{
        int ret, retry = 0;
        clockstat_t clockstat2;
        const nid_t *from, *to;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;
        vfm_t *vfm = args->vfm;
        char tmp[MAX_PATH_LEN];

        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        ret = chunk_connect(to, chkid, &args->parent, 0, &args->token, &clockstat2, &repstat);
        if (unlikely(ret)) {
                if ((ret == EIO || ret == ENODEV) && retry == 0) {
                        ret = __chunk_clean_disk_offline(args, _from, _to,
                                                         clockstat, recover);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        if (!__chunk_equal(chkid, from, clockstat, to, &clockstat2)) {
                if (recover) {
                        ret = __chunk_recovery_old(args, _from, _to, clockstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        DBUG("connect "CHKID_FORMAT" success with recover\n", CHKID_ARG(&chkinfo->id));
                } else {
                        if (vfm) {
                                uint64_t vfm_clock = vfm->clock;
                                if (__chunk_clean_online_usevfm(vfm, &clockstat2, to, chkinfo)) {
                                        if (vfm->clock != vfm_clock) {
                                                vfm_dump(vfm, tmp);
                                                DINFO("skip "CHKID_FORMAT" vfm %s vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                                                      CHKID_ARG(chkid), tmp,
                                                      network_rname(from), clockstat->vclock.clock,
                                                      clockstat->vclock.vfm, clockstat->dirty, clockstat->lost,
                                                      network_rname(to), clockstat2.vclock.clock,
                                                      clockstat2.vclock.vfm, clockstat2.dirty, clockstat2.lost);
                                        } else {
                                                DBUG("connect "CHKID_FORMAT" fail, skip with vfm\n", CHKID_ARG(&chkinfo->id));
                                        }
                                        __chunk_bh_sync(&args->parent, chkinfo, vfm, 0);
                                        chkstat->repstat[_to].ltime = 0;
                                        chkstat->repstat[_to].magic = 0;
                                } else {
                                        vfm_dump(vfm, tmp);
                                        DINFO("force sync "CHKID_FORMAT" vfm %s vclock %s:%ju,%ju,%u,%u --> %s:%ju,%ju,%u,%u\n",
                                              CHKID_ARG(chkid), tmp,
                                              network_rname(from), clockstat->vclock.clock,
                                              clockstat->vclock.vfm, clockstat->dirty, clockstat->lost,
                                              network_rname(to), clockstat2.vclock.clock,
                                              clockstat2.vclock.vfm, clockstat2.dirty, clockstat2.lost);
#if 1
                                        ret = __chunk_recovery_old(args, _from, _to, clockstat);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);
#else
                                        ret = EAGAIN;
                                        GOTO(err_ret, ret);
#endif
                                }
                        } else {
                                ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                chkstat->repstat[_to].ltime = 0;
                                chkstat->repstat[_to].magic = 0;

                                __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);
                        }
                }
        } else {
                DBUG("connect "CHKID_FORMAT" success\n", CHKID_ARG(&chkinfo->id));
                chkstat->repstat[_to] = repstat;
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_clean_offline(args_t *args, int _from, int _to,
                                   const clockstat_t *clockstat, int recover)
{
        int ret;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        vfm_t *vfm = args->vfm;

        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        (void) from;

        if (recover) {
                ret = __chunk_recovery_new(args, _from, _to, clockstat);
                if (unlikely(ret)) {
                        if (ret == ECANCELED) {
#if 0
                                ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);
                                DWARN("chunk "CHKID_FORMAT" @ %s set dirty\n", CHKID_ARG(&chkinfo->id),
                                      network_rname(to));
#else
                                GOTO(err_ret, ret);
#endif
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                if (vfm) {//check consistent
                        if (vfm_add_check(vfm, to, chkinfo) == 0
                            && vfm->clock > clockstat->vclock.vfm) {
                                DINFO("connect "CHKID_FORMAT" fail, skip with vfm\n", CHKID_ARG(&chkinfo->id));
                                __chunk_bh_sync(&args->parent, chkinfo, vfm, 0);
                                chkstat->repstat[_to].ltime = 0;
                                chkstat->repstat[_to].magic = 0;
                        } else {
#if 0
                                DWARN("chunk "CHKID_FORMAT" @ %s, retry later\n", CHKID_ARG(&chkinfo->id),
                                      network_rname(to));
                                ret = EBUSY;
                                GOTO(err_ret, ret);
#else
                                ret = __chunk_recovery_new(args, _from, _to, clockstat);
                                if (unlikely(ret)) {
                                        GOTO(err_ret, ret);
                                }
#endif
                        }
                } else {
                        ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        chkstat->repstat[_to].ltime = 0;
                        chkstat->repstat[_to].magic = 0;

                        __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);

                        DWARN("chunk "CHKID_FORMAT" @ %s set dirty\n", CHKID_ARG(&chkinfo->id),
                              network_rname(to));
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_clean(args_t *args, int _from, int _to,
                         const clockstat_t *clockstat, int recover)
{
        int ret;
        time_t ltime;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        vfm_t *vfm = args->vfm;

        if (likely(vfm)) {
                YASSERT(chkinfo->id.type == __RAW_CHUNK__);
        }

        // 对比两副本
        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        (void) from;
#if ENABLE_CHUNK_DEBUG
        DINFO("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
              network_rname(to));
#else
        DBUG("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
             network_rname(to));
#endif

        if (__chunk_online(to, &ltime)) {//online
                if (chkstat->repstat[_to].ltime != ltime) {//reset
                        /**
                         * 如果副本所在节点在线，优先恢复到该节点
                         * 拔盘就是这种情况，恢复时从多个节点读，写入节点则受限
                         * 影响恢复并行度，有可能导致性能瓶颈
                         */
                        ret = __chunk_clean_online(args, _from, _to,
                                                   clockstat, recover);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
                } else {
#if ENABLE_CHUNK_DEBUG
                        DINFO("chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
                              network_rname(to));
#else
                        DBUG("chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
                              network_rname(to));
#endif
                }
        } else {//offline
                ret = __chunk_clean_offline(args, _from, _to, clockstat, recover);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#if 1
STATIC int __chunk_dirty_online(args_t *args, int _from, int _to,
                                const clockstat_t *clockstat1)
{
        int ret;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;

        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        (void) from;

        ret = __chunk_recovery_old(args, _from, _to, clockstat1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chkinfo_set_status(chkinfo, _to, __S_CLEAN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
        DINFO("reset "CHKID_FORMAT" @ %s clock %ju\n",
             CHKID_ARG(&chkinfo->id), network_rname(to), clockstat1->vclock.clock);
#else
        DBUG("reset "CHKID_FORMAT" @ %s clock %ju\n",
             CHKID_ARG(&chkinfo->id), network_rname(to), clockstat1->vclock.clock);
#endif

        return 0;
err_ret:
        return ret;
}
#endif

STATIC int __chunk_recovery_new(args_t *args, int _from, int _to,
                                const clockstat_t *clockstat)
{
        int ret, online, total;
        const nid_t *from, *to;
        repstat_t repstat;
        clockstat_t clockstat2;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        /*YASSERT(!chkid_isvol(&chkinfo->id));*/

#if 1
        ret = conn_faultdomain(&total, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (online < chkinfo->repnum) {
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }
#endif

        (void) from;
        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;


#if ENABLE_CHUNK_DEBUG
        DINFO(""CHKID_FORMAT" need sync immediately\n", CHKID_ARG(&chkinfo->id));
#else
        DBUG(""CHKID_FORMAT" need sync immediately\n", CHKID_ARG(&chkinfo->id));
#endif

        ret = __chunk_push_newdisk(args, _from, _to, &clockstat->vclock);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = chunk_connect(to, &chkinfo->id, &args->parent, 0, &args->token, &clockstat2, &repstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        chkstat->repstat[_to] = repstat;
        ret = chkinfo_set_status(chkinfo, _to, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(chkinfo->diskid[_to].status == __S_CLEAN);

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_dirty(args_t *args, int _from, int _to,
                         const clockstat_t *clockstat)
{
        int ret;
        time_t ltime;
        const nid_t *from, *to;
        chkinfo_t *chkinfo = args->chkinfo;

        (void) from;
        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

#if ENABLE_CHUNK_DEBUG
        DINFO("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
              network_rname(to));
#else
        DBUG("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id), network_rname(to));
#endif

        if (__chunk_online(to, &ltime)) {
                ret = __chunk_dirty_online(args, _from, _to, clockstat);
                if (unlikely(ret)) {
                        DWARN("sync chunk "CHKID_FORMAT" @ %s fail, ret %u\n",
                              CHKID_ARG(&chkinfo->id),
                              network_rname(to), ret);
                        GOTO(err_ret, ret);
                }
        } else {
                ret = __chunk_recovery_new(args, _from, _to, clockstat);
                if (unlikely(ret)) {
                        DWARN("sync chunk "CHKID_FORMAT" @ %s fail, ret %u\n",
                              CHKID_ARG(&chkinfo->id),
                              network_rname(to), ret);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_needcheck_online(args_t *args, int _from, int _to,
                                    const clockstat_t *clockstat1)
{
        int ret, forcesync = 0, retry = 0;
        clockstat_t clockstat2;
        const nid_t *from, *to;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;

        (void) from;
        from = &chkinfo->diskid[_from].id;
        to = &chkinfo->diskid[_to].id;

        ret = chunk_connect(to, chkid, &args->parent, 0, &args->token, &clockstat2, &repstat);
        if (unlikely(ret)) {
                if ((ret == EIO || ret == ENODEV) && retry == 0) {
                        forcesync = 1;
                } else
                        GOTO(err_ret, ret);
        }

        if (forcesync || !__chunk_equal(chkid, from, clockstat1, to, &clockstat2)) {
                if (forcesync) {
                        ret = __chunk_recovery_new(args, _from, _to, clockstat1);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = __chunk_recovery_old(args, _from, _to, clockstat1);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        } else {
                chkstat->repstat[_to] = repstat;
        }

        ret = chkinfo_set_status(chkinfo, _to, __S_CLEAN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
        DINFO("reset "CHKID_FORMAT" @ %s clock %ju\n",
              CHKID_ARG(&chkinfo->id), network_rname(to), clockstat1->vclock.clock);
#else
        DBUG("reset "CHKID_FORMAT" @ %s clock %ju\n",
             CHKID_ARG(&chkinfo->id), network_rname(to), clockstat1->vclock.clock);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_needcheck(args_t *args, int _from, int _to, const clockstat_t *clockstat)
{
        int ret;
        time_t ltime;
        const nid_t *to;
        chkinfo_t *chkinfo = args->chkinfo;

        to = &chkinfo->diskid[_to].id;

#if ENABLE_CHUNK_DEBUG
        DINFO("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
             network_rname(to));
#else
        DBUG("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
             network_rname(to));
#endif

        if (__chunk_online(to, &ltime)) {
                ret = __chunk_needcheck_online(args, _from, _to, clockstat);
                if (unlikely(ret)) {
                        if (ret == ECANCELED) {
                                ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);
                                DWARN("chunk "CHKID_FORMAT" @ %s set dirty\n", CHKID_ARG(&chkinfo->id),
                                      network_rname(to));
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = __chunk_recovery_new(args, _from, _to, clockstat);
                if (unlikely(ret)) {
                        if (ret == ECANCELED) {
                                ret = chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                __chunk_bh_sync(&args->parent, chkinfo, NULL, 0);
                                DWARN("chunk "CHKID_FORMAT" @ %s set dirty\n", CHKID_ARG(&chkinfo->id),
                                      network_rname(to));
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_check____(args_t *args, const clockstat_t *clockstat,
                            int from, int to, int recover)
{
        int ret;
        reploc_t *reploc;
        chkinfo_t *chkinfo = args->chkinfo;
        chkinfo_t *old;
        char _old[CHKINFO_MAX];
        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];

        old = (void *)_old;
        CHKINFO_CP(old, chkinfo);

        reploc = &chkinfo->diskid[to];

#if ENABLE_CHUNK_DEBUG
        DINFO("from %d to %d status %d "VCLOCK_FORMAT" dirty %d recover %d\n",
              from, to, reploc->status, VCLOCK_ARG(&clockstat->vclock), clockstat->dirty, recover);
#else
        DBUG("from %d to %d status %d "VCLOCK_FORMAT" dirty %d recover %d\n",
              from, to, reploc->status, VCLOCK_ARG(&clockstat->vclock), clockstat->dirty, recover);
#endif

        // TODO 为什么要基于目标副本的状态进行区分? 有什么重要的区别?

        if (likely(reploc->status == __S_CLEAN)) {
                ret = __chunk_clean(args, from, to, clockstat, recover);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (recover) {
                if (reploc->status == __S_DIRTY) {
                        ret = __chunk_dirty(args, from, to, clockstat);
                        if (unlikely(ret)) {
                                DBUG("chunk "CHKID_FORMAT", nothing todo\n", CHKID_ARG(&chkinfo->id));
                        }
                } else if (reploc->status == __S_CHECK) {
                        ret = __chunk_needcheck(args, from, to, clockstat);
                        if (unlikely(ret)) {
                                DBUG("chunk "CHKID_FORMAT", nothing todo\n", CHKID_ARG(&chkinfo->id));
                        }
                } else {
                        UNIMPLEMENTED(__DUMP__);
                }
        } else {
                DBUG("chunk "CHKID_FORMAT", nothing todo\n", CHKID_ARG(&chkinfo->id));
        }

        schedule_stack_assert(NULL);

        CHKINFO_STR(old, tmp1);
        CHKINFO_STR(chkinfo, tmp2);
        if (old->info_version != chkinfo->info_version) {
#if ENABLE_CHUNK_DEBUG
                DINFO("%s  --> %s\n", tmp1, tmp2);
#else
                DBUG("%s  --> %s\n", tmp1, tmp2);
#endif
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_sync____(args_t *args, const clockstat_t *clockstat,
                            int from, int to, int recover)
{
        int ret;
        reploc_t *reploc;
        chkinfo_t *chkinfo = args->chkinfo;
        chkinfo_t *old;
        char _old[CHKINFO_MAX];
        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];

        old = (void *)_old;
        CHKINFO_CP(old, chkinfo);

        reploc = &chkinfo->diskid[to];

#if ENABLE_CHUNK_DEBUG
        DINFO("from %d to %d status %d clock %ju dirty %d recover %d\n",
              from, to, reploc->status, clockstat->vclock.clock, clockstat->dirty, recover);
#else
        DBUG("from %d to %d status %d clock %ju dirty %d recover %d\n",
              from, to, reploc->status, clockstat->vclock.clock, clockstat->dirty, recover);
#endif

        // TODO 为什么要基于目标副本的状态进行区分? 有什么重要的区别?

        if (likely(reploc->status == __S_CLEAN)) {
                ret = __chunk_clean(args, from, to, clockstat, recover);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                if (reploc->status == __S_DIRTY) {
                        ret = __chunk_dirty(args, from, to, clockstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (reploc->status == __S_CHECK) {
                        ret = __chunk_needcheck(args, from, to, clockstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        schedule_stack_assert(NULL);

        CHKINFO_STR(old, tmp1);
        CHKINFO_STR(chkinfo, tmp2);
        if (old->info_version != chkinfo->info_version) {
#if ENABLE_CHUNK_DEBUG
                DINFO("%s  --> %s\n", tmp1, tmp2);
#else
                DBUG("%s  --> %s\n", tmp1, tmp2);
#endif
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_check__(args_t *args, const clockstat_t *clockstat, int idx)
{
        int ret, i, consistent, recover;
        reploc_t *reploc;
        repstat_t *repstat;
        const nid_t *nid;
        clockstat_t _clockstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        if (gloconf.metadata_consistent) {
                recover = chkinfo->id.type == __RAW_CHUNK__ ? 0 : 1;
        } else {
                recover = 0;
        }

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (i == idx)
                        continue;

                reploc = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

                consistent = chunk_replica_consistent(&chkinfo->id, reploc, repstat->ltime, args->vfm);
                if (likely(consistent))
                        continue;

                ret = __chunk_check____(args, clockstat, idx, i, recover);
                if (unlikely(ret)) {
                        if (ret == ECANCELED && recover) {
                                ret = __chunk_check____(args, clockstat, idx, i, 0);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        }

        if (unlikely(clockstat->lost || clockstat->dirty)) {
                _clockstat = *clockstat;
                _clockstat.lost = 0;
                _clockstat.dirty = 0;

                for (i = 0; i < (int)chkinfo->repnum; i++) {
                        nid = &chkinfo->diskid[i].id;
                        repstat = &chkstat->repstat[i];
                        if (repstat->ltime) {
                                if (net_islocal(nid)) {
                                        ret = replica_srv_setclock(nid, &chkinfo->id, &_clockstat);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);
                                } else {
                                        ret = replica_rpc_setclock(nid, &chkinfo->id, &_clockstat);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);
                                }
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_sync__(args_t *args, const clockstat_t *clockstat, int idx)
{
        int ret, i;
        reploc_t *reploc;
        repstat_t *repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (i == idx)
                        continue;

                reploc = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

                if (chunk_replica_intact(&chkinfo->id, reploc, repstat->ltime, NULL))
                        continue;

                ret = __chunk_sync____(args, clockstat, idx, i, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_candidate(const chkid_t *chkid,const reploc_t *reploc,
                             const vfm_t *vfm, time_t *ltime)
{
        int ret;

        if (unlikely(reploc->status)) {
                DBUG(CHKID_FORMAT" %s status %u\n", CHKID_ARG(chkid),
                     network_rname(&reploc->id), reploc->status);
                return 0;
        }

        if (unlikely(vfm_exist(vfm, &reploc->id))) {
                DBUG(CHKID_FORMAT" %s in vfm\n", CHKID_ARG(chkid),
                     network_rname(&reploc->id));
                return 0;
        }

        ret = network_connect(&reploc->id, ltime, 1, 0);
        if (unlikely(ret)) {
#if 0
                if (net_islocal(&reploc->id)) {
                        ret = EAGAIN;
                        DERROR("connect to localhost fail\n");
                        GOTO(err_ret, ret);
                }
#endif

                DBUG(CHKID_FORMAT" connect %s fail\n", CHKID_ARG(chkid),
                     network_rname(&reploc->id));
                return 0;
        }

        return 1;
}


STATIC int __chunk_get_clean2(const chkinfo_t *chkinfo, const vfm_t *vfm,
                              const fileid_t *parent, const lease_token_t *token,
                              clockstat_t *clockstat, repstat_t *repstat, int *_idx)
{
        int ret, i, found, enoent = 0, idx, rand;
        const reploc_t *reploc;
        time_t ltime;
        clockstat_t tmp = {{0, 0}, 0};
        repstat_t reps;
        int dirty_skip = 0;

dirty_retry:
        found = 0;
        rand = fastrandom();
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                idx = (i + rand) % chkinfo->repnum;
                reploc = &chkinfo->diskid[idx];
                if (__chunk_candidate(&chkinfo->id, reploc, vfm, &ltime) == 0) {
                        continue;
                }

                ret = chunk_connect(&reploc->id, &chkinfo->id, parent, 0, token,
                                    clockstat, repstat);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                enoent++;
                                DBUG(CHKID_FORMAT" connect %s fail\n", CHKID_ARG(&chkinfo->id),
                                     network_rname(&reploc->id));
                                continue;
                        } else if (ret == EIO || ret == ENODEV) {
                                DBUG(CHKID_FORMAT" connect %s fail\n", CHKID_ARG(&chkinfo->id),
                                     network_rname(&reploc->id));
                                continue;
                        } else
                                goto err_ret;
                }

                if (dirty_skip == 0 && clockstat->dirty) {
                        continue;
                }

                if (found == 0 || clockstat->vclock.clock > tmp.vclock.clock) {
                        tmp = *clockstat;
                        reps = *repstat;
                }

                found++;
#if !CHOOSE_NEWEST_CHUNK
                break;
#endif
        }

        *_idx = idx;
        *clockstat = tmp;
        *repstat = reps;

        if (unlikely(found == 0)) {
                if(dirty_skip == 0) {
                        dirty_skip = 1;
                        goto  dirty_retry;
                }

                ret = ENONET;
#if ENABLE_CHUNK_DEBUG
                CHKINFO_DUMP(chkinfo, D_INFO);
#endif
                char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];
                vfm_dump(vfm, tmp1);
                CHKINFO_STR(chkinfo, tmp2);
                DWARN("chkinfo %s vfm %s\n", tmp2, tmp1);
                goto err_ret;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_check_fully(args_t *args)
{
        int ret, idx;
        clockstat_t clockstat;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

#if ENABLE_CHUNK_DEBUG
        char tmp[MAX_PATH_LEN];
        vfm_dump(args->vfm, tmp);
        DINFO("check chunk "CHKID_FORMAT"  fully, vfm %s\n", CHKID_ARG(&chkinfo->id), tmp);
#else
        DBUG("check chunk "CHKID_FORMAT"  fully\n", CHKID_ARG(&chkinfo->id));
#endif

        /**
         * 所有副本状态不对时，选一可用，覆盖其余
         */
        ret = __chunk_get_clean2(chkinfo, args->vfm, &args->parent, &args->token, &clockstat, &repstat, &idx);
        if (unlikely(ret)) {
                goto err_ret;
        }

#if ENABLE_CHUNK_DEBUG
        DINFO("update chunk "CHKID_FORMAT" clock from %ju "VCLOCK_FORMAT" dirty %d\n",
             CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock,
             VCLOCK_ARG(&clockstat.vclock), clockstat.dirty);
#else
        DBUG("update chunk "CHKID_FORMAT" clock from %ju "VCLOCK_FORMAT" dirty %d\n",
             CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock,
             VCLOCK_ARG(&clockstat.vclock), clockstat.dirty);
#endif

        // @note 修复模式：从replica.clock同步到chunk.clock，仅此一处
        chkstat->chkstat_clock = clockstat.vclock.clock;
        chkstat->magic = chunk_magic();
        chkstat->repstat[idx] = repstat;

        ret = __chunk_check__(args, &clockstat, idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

//look from connected replica
STATIC int __chunk_get_clean1(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                              int *_idx, clockstat_t *clockstat)
{
        int ret, i, found, idx, rand;
        const reploc_t *reploc;
        time_t ltime;
        const nid_t *nid;
        int dirty_skip = 0;
        clockstat_t tmp = {{0, 0}, 0};

dirty_retry:
        found = 0;
        rand = fastrandom();
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                idx = (i + rand) % chkinfo->repnum;
                reploc = &chkinfo->diskid[idx];
                if (__chunk_candidate(&chkinfo->id, reploc, vfm, &ltime) == 0) {
                        continue;
                }

                if (likely(chkstat->repstat[idx].ltime != ltime)) {
                        continue;
                }

                nid = &reploc->id;
                ret = chunk_getclock(nid, &chkinfo->id, clockstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (dirty_skip == 0 && clockstat->dirty) {
                        continue;
                }

                if (found == 0 || clockstat->vclock.clock > tmp.vclock.clock) {
                        tmp = *clockstat;
                }

                found++;
#if !CHOOSE_NEWEST_CHUNK
                break;
#endif
        }

        *_idx = idx;
        *clockstat = tmp;

        if (unlikely(found == 0)) {
                if(dirty_skip == 0) {
                        dirty_skip = 1;
                        goto  dirty_retry;
                }

                ret = ENONET;
                char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN];
                vfm_dump(vfm, tmp);
                CHKINFO_STR(chkinfo, buf);
                DINFO("%s, %s\n", buf, tmp);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_get_clean__(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                               int *idx_array, clockstat_t *clock_array, int *_found)
{
        int ret, i, found, idx, rand;
        const reploc_t *reploc;
        time_t ltime;
        const nid_t *nid;
        clockstat_t clockstat;
        int dirty_skip = 0;

dirty_retry:
        found = 0;
        rand = fastrandom();
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                idx = (i + rand) % chkinfo->repnum;
                reploc = &chkinfo->diskid[idx];
                if (__chunk_candidate(&chkinfo->id, reploc, vfm, &ltime) == 0) {
                        continue;
                }

                if (likely(chkstat->repstat[idx].ltime != ltime)) {
                        continue;
                }

                nid = &reploc->id;
                ret = chunk_getclock(nid, &chkinfo->id, &clockstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (dirty_skip == 0 && clockstat.dirty) {
                        continue;
                }

                clock_array[found] = clockstat;
                idx_array[found] = idx;

                found++;
        }

        if (unlikely(found == 0)) {
                if(dirty_skip == 0) {
                        dirty_skip = 1;
                        goto  dirty_retry;
                } else {
                        ret = ENONET;
                        char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN];
                        vfm_dump(vfm, tmp);
                        CHKINFO_STR(chkinfo, buf);
                        DINFO("%s, %s\n", buf, tmp);
                        GOTO(err_ret, ret);
                }
        }

        *_found = found;
        
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_get_clean(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                             int *_idx, clockstat_t *clockstat)
{
        int ret, i, count, idx_array[LICH_REPLICA_MAX];
        clockstat_t clock_array[LICH_REPLICA_MAX];
        nid_t nids[LICH_REPLICA_MAX], nid;

        ret = __chunk_get_clean__(chkinfo, chkstat, vfm, idx_array, clock_array, &count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (count == 1) {
                *_idx = idx_array[0];
                *clockstat = clock_array[0];
        } else {//balance
                for (i = 0; i < count; i++) {
                        nids[i] = chkinfo->diskid[idx_array[i]].id;

                        DBUG("array[%u] @ %s clock %ju\n", i, network_rname(&nids[i]), clock_array[i].vclock.clock);
                }

                netable_select(nids, count, &nid);

                for (i = 0; i < count; i++) {
                        if (nid_cmp(&nid, &chkinfo->diskid[idx_array[i]].id) == 0) {
                                *_idx = idx_array[i];
                                *clockstat = clock_array[i];

                                DBUG(CHKID_FORMAT"clean replica @ %s clock %ju\n", CHKID_ARG(&chkinfo->id),
                                     network_rname(&chkinfo->diskid[idx_array[i]].id), clock_array[i].vclock.clock);
                        
                                break;
                        }
                }

                YASSERT(i < count);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_check_partly(args_t *args)
{
        int ret, idx = 0;
        clockstat_t clockstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

#if ENABLE_CHUNK_DEBUG
        DINFO("check chunk "CHKID_FORMAT"  partly\n", CHKID_ARG(&chkinfo->id));
        CHKINFO_DUMP(chkinfo, D_INFO);
#else
        DBUG("check chunk "CHKID_FORMAT"  partly\n", CHKID_ARG(&chkinfo->id));
#endif

        ret = __chunk_get_clean1(chkinfo, chkstat, args->vfm, &idx, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

#if 0
        YASSERT(clockstat.vclock.clock == chkstat->chkstat_clock);
#else
        if (unlikely(clockstat.vclock.clock != chkstat->chkstat_clock)) {
                //YASSERT(clockstat.vclock.clock < chkstat->chkstat_clock);

#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                      CHKID_ARG(&chkinfo->id), clockstat.vclock.clock,
                      chkstat->chkstat_clock);
#else
                DBUG("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                     CHKID_ARG(&chkinfo->id), clockstat.vclock.clock,
                     chkstat->chkstat_clock);
#endif
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }
#endif

#if ENABLE_CHUNK_DEBUG
        DINFO("update chunk "CHKID_FORMAT" clock from %ju "VCLOCK_FORMAT" dirty %d\n",
             CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock,
             VCLOCK_ARG(&clockstat.vclock), clockstat.dirty);
#else
        DBUG("update chunk "CHKID_FORMAT" clock from %ju "VCLOCK_FORMAT" dirty %d\n",
             CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock,
             VCLOCK_ARG(&clockstat.vclock), clockstat.dirty);
#endif

        clockstat.vclock.clock = chkstat->chkstat_clock;
        clockstat.dirty = 0;
        //clockstat.lost = 0;

        ret = __chunk_check__(args, &clockstat, idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC void __chunk_proto_connect_force(const chkid_t *parent, const lease_token_t *token, const chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int ret, i;
        const nid_t *nid;
        clockstat_t clockstat;
        repstat_t repstat;

        (void) parent;

#if ENABLE_CHUNK_DEBUG
        DINFO("force connect "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));
#else
        DBUG("force connect "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));
#endif

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;
                ret = chunk_connect(nid, &chkinfo->id, parent, 1, token, &clockstat, &repstat);
                if (unlikely(ret)) {
                        DBUG("connect "CHKID_FORMAT" @%s ret %u %s\n",
                              CHKID_ARG(&chkinfo->id), network_rname(nid), ret, strerror(ret));
                        // @note 重置ltime, 很重要!!!
                        chkstat->repstat[i].ltime = 0;
                        continue;
                }

                repstat.ltime = 0;
                chkstat->repstat[i] = repstat;
        }
}

STATIC int __chunk_proto_rep_check(args_t *args)
{
        int ret, connected;

        ANALYSIS_BEGIN(0);

        connected = chunk_proto_connected(args->chkinfo, args->chkstat, args->vfm);
        if (connected == 0) {
                // 卷加载后，chunk的各副本ltime = 0，所以会进入此代码分支
                // 从选择出的副本同步clock到chkstat_clock
                ret =  __chunk_check_fully(args);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                goto err_ret;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
        } else {
                ret = __chunk_check_partly(args);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "__chunk_proto_rep_check");

        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param chkinfo
 * @param chkstat
 * @param parent ?
 * @param op  __OP_WRITE | __OP_READ (不主动修复）
 * @param force
 * @param tier
 * @param flags 体现__table2_chunk_check和__table2_chunk_sync的主要区别
 * @return
 */

int chunk_proto_rep_check(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                          vfm_t *vfm, const fileid_t *parent,
                          int force, const lease_token_t *token,
                          const ec_t *ec, int *oflags)
{
        int ret, retry = 0, eio = 0;

        if (likely(vfm)) {
                YASSERT(chkinfo->id.type == __RAW_CHUNK__);
        }

#if !ENABLE_VFM
        vfm = NULL;
#endif

        (void) ec;

        if (likely(chunk_proto_consistent(chkinfo, chkstat, vfm))) {
                goto out;
        }

#if ENABLE_CHUNK_DEBUG
        chkinfo_rack_check(chkinfo);
        CHKINFO_DUMP(chkinfo, D_INFO);
        DINFO("check "CHKID_FORMAT"  force %d\n", CHKID_ARG(&chkinfo->id), force);
#else
        DBUG("check "CHKID_FORMAT" repnum %d force %d\n",
             CHKID_ARG(&chkinfo->id), chkinfo->repnum, force);
#endif

        args_t args;
        args_init(&args, pool, parent, chkinfo, chkstat, vfm, token);

retry:
        args.oflags = 0;
        ret = __chunk_proto_rep_check(&args);
        if (oflags) {
                *oflags = args.oflags;

                DBUG("pool %s chunk "CHKID_FORMAT" oflags %d\n", pool, CHKID_ARG(&chkinfo->id), *oflags);
        }
        if (unlikely(ret)) {
                DBUG("check "CHKID_FORMAT" fail\n", CHKID_ARG(&chkinfo->id));

                /*
                 * __replica_read return EIO in chunk_push if from node disk lost
                 *
                 * @see __replica_srv_connect EPERM
                 * @see __replica_srv_connect ENODEV
                 */
                if (ret == EIO && eio == 0) {
                        __chunk_proto_connect_force(&args.parent, token, chkinfo, chkstat);
                        eio = 1;
                        goto retry;
                } else if (ret == EPERM && force && retry == 0) {
                        __chunk_proto_connect_force(&args.parent, token, chkinfo, chkstat);
                        retry++;
                        goto retry;
                } else {
                        GOTO(err_ret, ret);
                }
        }

#if ENABLE_CHUNK_DEBUG
        CHKINFO_DUMP(chkinfo, D_INFO);
#endif

out:
        return 0;
err_ret:
        return ret;
}

STATIC int IO_FUNC __chunk_read_getnode(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                                        const vfm_t *vfm, nid_t *nid, int *_count)
{
        int  ret, count, i;
        const reploc_t *tmp;
        const repstat_t *repstat;
        char buf[MAX_BUF_LEN];
        time_t ltime;

        count = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                tmp = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

#if 1
                if (unlikely((repstat->ltime == 0) || (tmp->status != 0))) {
                        if ((!vfm_exist(vfm, &tmp->id))
                            && (tmp->status != __S_DIRTY)
                            && (tmp->status != __S_CHECK)) {
                                CHKINFO_DUMP(chkinfo, D_INFO);
                                vfm_dump(vfm, buf);
                                DWARN("chunk "CHKID_FORMAT"@ %s ltime %u, status %u, vfm %s\n",
                                       CHKID_ARG(&chkinfo->id), network_rname(&tmp->id),
                                       (int)repstat->ltime, tmp->status, buf);

                                // TODO not call chunk_check?
                                ret = ESTALE;
                                GOTO(err_ret, ret);
                        }

#if ENABLE_CHUNK_DEBUG
                        DINFO("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                             CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
#else
                        DBUG("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                             CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
#endif
                        continue;
                }
#else

                if (unlikely(chkstat->repstat[i].ltime == 0)) {
                        DBUG("%s not connected\n", network_rname(&tmp->id));
                        continue;
                }

                if (unlikely(tmp->status != 0)) {
                        //YASSERT(tmp->status == __S_CHECK);
                        DBUG("%s status %u\n", network_rname(&tmp->id), tmp->status);
                        continue;
                }
#endif

                ret = network_connect(&tmp->id, &ltime, 0, 0);
                if (unlikely(ret)) {
                        if (vfm_exist(vfm, &tmp->id)) {
                                DINFO("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                                      CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                if (repstat->ltime != ltime) {
                        DWARN("chunk "CHKID_FORMAT"@ %s reset, ltime %ld status %d vfm exists %d\n",
                              CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
                        ret = ESTALE;
                        GOTO(err_ret, ret);
                }

                nid[count] = tmp->id;
                count++;
        }

        if (unlikely(count == 0)) {
                ret = ENONET;
                CHKINFO_DUMP(chkinfo, D_INFO);
                GOTO(err_ret, ret);
        }

        *_count = count;

        return 0;
err_ret:
        return ret;
}

STATIC int IO_FUNC __chunk_read_commit(const nid_t *_nid,  int replica_count,
                               const io_t *io, buffer_t *buf)
{
        int ret, i;
        const nid_t *nid;
        time_t ltime;
        nid_t dist;

        if (unlikely(!gloconf.rdma)) {
                netable_select(_nid, replica_count, &dist);
                nid = &dist;
        } else {
                nid = NULL;
                for (i = 0; i < replica_count; i++) {
                        if (net_islocal(&_nid[i])) {
                                nid = &_nid[i];
                                break;
                        }
                }

                if (nid == NULL) {
                        nid = &_nid[0];
                }
        }

        ret = network_connect(nid, &ltime, 1, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (likely(net_islocal(nid))) {
                DBUG("read "CHKID_FORMAT" %u %ju local\n",
                     CHKID_ARG(&io->id), io->size, io->offset);
                ret = replica_srv_read(nid, io, buf);
                if (unlikely(ret)) {
                        if (ret == EPERM || ret == EIO || ret == ENODEV) {
                                ret = EAGAIN;
                        }

                        GOTO(err_close, ret);
                }
        } else {
                DBUG("read "CHKID_FORMAT" %u %ju remote @ %s\n",
                     CHKID_ARG(&io->id), io->size, io->offset, network_rname(nid));
                ret = replica_rpc_read(nid, io, buf);
                if (unlikely(ret)) {
                        if (ret == EPERM || ret == EIO || ret == ENODEV) {
                                ret = EAGAIN;
                        }

                        GOTO(err_close, ret);
                }
        }

        return 0;
err_close:
        network_ltime_reset(nid, ltime, "__chunk_read_commit");
err_ret:
        return ret;
}

int IO_FUNC chunk_proto_rep_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                         const io_t *io, buffer_t *buf, const ec_t *ec)
{
        int ret, replica_count;
        nid_t nid[LICH_REPLICA_MAX];

        /* temporary disable for ec bug */
        SCHEDULE_LEASE_CHECK(err_ret, ret);


#if ENABLE_VFM
        if (likely(chkinfo->id.type == __RAW_CHUNK__)) {
                YASSERT(vfm);
        }
#else
        vfm = NULL;
#endif

        (void) ec;
#if ENABLE_CHUNK_DEBUG
        DINFO("read "CHKID_FORMAT" %u %ju\n",
              CHKID_ARG(&chkinfo->id), io->size, io->offset);
#else
        DBUG("read "CHKID_FORMAT" %u %ju\n",
             CHKID_ARG(&chkinfo->id), io->size, io->offset);
#endif

        YASSERT(io->size + io->offset <= LICH_CHUNK_SPLIT);

        ANALYSIS_BEGIN(0);

        // 选择有效副本
        ret = __chunk_read_getnode(chkinfo, chkstat, vfm, nid, &replica_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __chunk_read_commit(nid, replica_count, io, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_proto_read");

        return 0;
err_ret:
        ret = (ret == ENOENT) ? EREMCHG : ret;
        return ret;
}

STATIC int __chunk_write_getnode(const chkinfo_t *chkinfo,
                                 const chkstat_t *chkstat, const vfm_t *vfm, nid_t *nid,
                                 uint32_t *magic, int *_count)
{
        int  ret, count, i;
        const reploc_t *tmp;
        const repstat_t *repstat;
        char buf[MAX_BUF_LEN];
        time_t ltime;

        count = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                tmp = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

                if (unlikely((repstat->ltime == 0) || (tmp->status != 0))) {
                        if ((!vfm_exist(vfm, &tmp->id))
                            && (tmp->status != __S_DIRTY)
                            && (tmp->status != __S_CHECK)) {
                                CHKINFO_DUMP(chkinfo, D_INFO);
                                vfm_dump(vfm, buf);
                                DWARN("chunk "CHKID_FORMAT"@ %s ltime %u, status %u, vfm %s\n",
                                       CHKID_ARG(&chkinfo->id), network_rname(&tmp->id),
                                       (int)repstat->ltime, tmp->status, buf);

                                ret = ESTALE;
                                GOTO(err_ret, ret);
                        }

#if ENABLE_CHUNK_DEBUG
                        DINFO("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                             CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
#else
                        DBUG("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                             CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
#endif
                        continue;
                }

                ret = network_connect(&tmp->id, &ltime, 0, 0);
                if (unlikely(ret)) {
                        if (vfm_exist(vfm, &tmp->id)) {
                                DBUG("chunk "CHKID_FORMAT"@ %s not connected, ltime %ld status %d vfm exists %d\n",
                                      CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                if (repstat->ltime != ltime) {
                        DWARN("chunk "CHKID_FORMAT"@ %s reset, ltime %ld status %d vfm exists %d\n",
                              CHKID_ARG(&chkinfo->id), network_rname(&tmp->id), repstat->ltime, tmp->status, vfm_exist(vfm, &tmp->id));
                        ret = ESTALE;
                        GOTO(err_ret, ret);
                }

                nid[count] = tmp->id;
                magic[count] = repstat->magic;
                count++;
        }

        if (unlikely(count == 0)) {
                ret = ENONET;
                CHKINFO_DUMP(chkinfo, D_INFO);
                GOTO(err_ret, ret);
        }

        *_count = count;

        return 0;
err_ret:
        return ret;
}

typedef struct {
        io_t io;
        const nid_t *nid;
        const buffer_t *buf;
        uint32_t magic;

        task_t *task;
        int *count;
        int retval;
} chunk_write_ctx_t;

STATIC void __chunk_proto_write__(void *arg)
{
        int ret;
        time_t ltime;
        chunk_write_ctx_t *ctx = arg;

        ANALYSIS_BEGIN(0);

        ret = network_connect(ctx->nid, &ltime, 1, 0);
        if (unlikely(ret)) {
                DBUG("chkid "CHKID_FORMAT" nid %d (%s) ret %d\n",
                      CHKID_ARG(&ctx->io.id),
                      ctx->nid->id,
                      network_rname(ctx->nid),
                      ret);
                GOTO(err_ret, ret);
        }

        if (net_islocal(ctx->nid) && ctx->io.id.type == __RAW_CHUNK__) {
                ret = replica_srv_write(ctx->nid, &ctx->io, ctx->buf, ctx->magic);
                if (unlikely(ret)) {
                        DWARN("chkid "CHKID_FORMAT" nid %d (%s) ret %d\n",
                              CHKID_ARG(&ctx->io.id),
                              ctx->nid->id,
                              network_rname(ctx->nid),
                              ret);

                        if (ret == EPERM || ret == EIO || ret == ENODEV) {
                                ret = EAGAIN;
                        }

                        GOTO(err_close, ret);
                }
        } else {
                ret = replica_rpc_write(ctx->nid, &ctx->io, ctx->buf, ctx->magic);
                if (unlikely(ret)) {
                        DBUG("chkid "CHKID_FORMAT" nid %d (%s) ret %d\n",
                              CHKID_ARG(&ctx->io.id),
                              ctx->nid->id,
                              network_rname(ctx->nid),
                              ret);

                        if (ret == EPERM || ret == EIO || ret == ENODEV) {
                                ret = EAGAIN;
                        }

                        GOTO(err_close, ret);
                }
        }

        YASSERT(*ctx->count > 0);

        ctx->retval = 0;
        *ctx->count = *ctx->count - 1;
        if (*ctx->count == 0)
                schedule_resume(ctx->task, 0, NULL);

        ANALYSIS_QUEUE(0, IO_WARN, "__chunk_proto_write__");

        return;
err_close:
        network_ltime_reset(ctx->nid, ltime, "__chunk_proto_write__");
err_ret:
        ctx->retval = ret;
        *ctx->count = *ctx->count - 1;
        if (*ctx->count == 0)
                schedule_resume(ctx->task, 0, NULL);

        return;
}

STATIC int __chunk_proto_write(const nid_t *_nid, const io_t *io,  const buffer_t *_buf,
                               const uint32_t *magic, int replica_count)
{
        int ret, i, success = 0, count;
        chunk_write_ctx_t _ctx[LICH_REPLICA_MAX], *ctx;
        task_t task;

        count = replica_count;
        task = schedule_task_get();
        for (i = 0; i < replica_count; i++) {
                ctx = &_ctx[i];
                ctx->buf = _buf;
                ctx->io = *io;
                ctx->nid = &_nid[i];
                ctx->magic = magic[i];
                ctx->task = &task;
                ctx->count = &count;
                schedule_task_new("chunk_proto_write", __chunk_proto_write__, ctx, -1);
        }

        ret = schedule_yield("replica_wait", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < replica_count; i++) {
                ctx = &_ctx[i];
                if (ctx->retval == 0)
                        success++;
        }

        if (unlikely(success != replica_count)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_proto_write_request(va_list ap)
{
        int ret;
        const nid_t *nid = va_arg(ap, const  nid_t *);
        const io_t *io = va_arg(ap, const io_t *);
        const buffer_t *buf = va_arg(ap, const buffer_t *);
        const uint32_t *magic = va_arg(ap, const uint32_t *);
        int replica_count = va_arg(ap, int);

        va_end(ap);

        ret = __chunk_proto_write(nid, io, buf, magic, replica_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int IO_FUNC chunk_proto_rep_write(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                          const io_t *io, const buffer_t *buf, const ec_t *ec)
{
        int ret, replica_count;
        uint32_t magic[LICH_REPLICA_MAX];
        nid_t nid[LICH_REPLICA_MAX];

#if ENABLE_VFM
        if (likely(chkinfo->id.type == __RAW_CHUNK__)) {
                YASSERT(vfm);
        }
#else
        vfm = NULL;
#endif

        /* temporary disable for ec bug */
        SCHEDULE_LEASE_CHECK(err_ret, ret);

        (void) ec;
        ANALYSIS_BEGIN(0);

#if ENABLE_CHUNK_DEBUG
        DINFO("write "CHKID_FORMAT" %u %ju, "VCLOCK_FORMAT"\n",
              CHKID_ARG(&chkinfo->id), io->size, io->offset, VCLOCK_ARG(&io->vclock));
#else
        DBUG("write "CHKID_FORMAT" %u %ju, "VCLOCK_FORMAT"\n",
              CHKID_ARG(&chkinfo->id), io->size, io->offset, VCLOCK_ARG(&io->vclock));
#endif

        YASSERT(io->size + io->offset <= LICH_CHUNK_SPLIT);

        ret = __chunk_write_getnode(chkinfo, chkstat, vfm, nid, magic, &replica_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (likely(schedule_running())) {
                ret = __chunk_proto_write(nid, io, buf, magic, replica_count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = core_request(core_hash(&chkinfo->id), -1, "chunk_proto_write",
                                   __chunk_proto_write_request, nid, io, buf, magic,
                                   replica_count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_proto_write");

        return 0;
err_ret:
        ret = (ret == ENOENT) ? EREMCHG : ret;
        return ret;
}

typedef struct {
        const char *pool;
        const nid_t *nid;
        const chkid_t *chkid;
        const chkid_t *parent;
        int chknum;
        int initzero;
        const buffer_t *buf;
        uint64_t info_version;

        int retval;
        int *count;
        task_t *task;
} chunk_create_ctx_t;

STATIC void __chunk_proto_create(void *arg)
{
        int ret;
        time_t ltime;
        chunk_create_ctx_t *ctx = arg;

        ret = network_connect(ctx->nid, &ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(ctx->nid)) {
                ret = replica_srv_create(ctx->pool, ctx->nid, ctx->chkid, ctx->chknum,
                                         ctx->parent, -1, ctx->initzero, ctx->buf, ctx->info_version, 1);
                if (unlikely(ret)) {
                        GOTO(err_close, ret);
                }
        } else {
                ret = replica_rpc_create(ctx->pool, ctx->nid, ctx->chkid, ctx->chknum,
                                         ctx->parent, -1, ctx->initzero, ctx->buf, ctx->info_version, 1);
                if (unlikely(ret)) {
                        GOTO(err_close, ret);
                }
        }

        YASSERT(*ctx->count > 0);

        ctx->retval = 0;
        *ctx->count = *ctx->count - 1;
        if (*ctx->count == 0)
                schedule_resume(ctx->task, 0, NULL);

        return;
err_close:
        network_ltime_reset(ctx->nid, ltime, "__chunk_proto_create");
err_ret:
        ctx->retval = ret;
        *ctx->count = *ctx->count - 1;
        if (*ctx->count == 0)
                schedule_resume(ctx->task, 0, NULL);

        return;
}

int chunk_proto_rep_create(const char *pool, const chkid_t *chkids, int chknum,
                           const nid_t *nids, int repnum, const fileid_t *parent,
                           const nid_t *parentnid, int initzero, uint64_t info_version,
                           const buffer_t *buf, const ec_t *ec)
{
        int ret, i, success = 0, count;
        chunk_create_ctx_t _ctx[LICH_REPLICA_MAX], *ctx;
        task_t task;

        (void) ec;
        (void) parentnid;

        ANALYSIS_BEGIN(0);

        YASSERT(repnum);
        YASSERT(parent && (parent->type == __POOL_CHUNK__ || parent->type == __VOLUME_CHUNK__));

        //YASSERT(chkinfo->id.id && chkid_cmp(parent, &chkinfo->id));

#if ENABLE_CHUNK_DEBUG
        DINFO("create chunk "CHKID_FORMAT" count %d @ %s/"CHKID_FORMAT"\n",
              CHKID_ARG(chkids), chknum, network_rname(parentnid), CHKID_ARG(parent));
#else
        DBUG("create chunk "CHKID_FORMAT" count %d @ %s/"CHKID_FORMAT"\n",
             CHKID_ARG(chkids), chknum, network_rname(parentnid), CHKID_ARG(parent));
#endif

        count = repnum;
        task = schedule_task_get();

        for (i = 0; i < repnum; i++) {
                ctx = &_ctx[i];
                ctx->pool = pool;
                ctx->nid = &nids[i];
                ctx->chkid = chkids;
                ctx->chknum = chknum;
                ctx->parent = parent;
                ctx->initzero = initzero;
                ctx->buf = buf;
                ctx->info_version = info_version;

                ctx->task = &task;
                ctx->count = &count;
                schedule_task_new("chunk_proto_create", __chunk_proto_create, ctx, -1);
        }

        ret = schedule_yield("chunk_proto_create", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < repnum; i++) {
                ctx = &_ctx[i];
                if (ctx->retval == 0)
                        success++;
        }

        if (unlikely(success != repnum)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_proto_create");

        return 0;
err_ret:
        return ret;
}

int chunk_proto_rep_sha1(const chkinfo_t *chkinfo, sha1_result_t *result)
{
        int ret, i;
        const nid_t *nid;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;
                if (net_islocal(nid)) {
                        ret = replica_srv_sha1(&chkinfo->id, 0, result->md[i]);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = replica_rpc_sha1(nid, &chkinfo->id, 0, result->md[i]);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
                }
        }

        result->count = chkinfo->repnum;

        return 0;
err_ret:
        return ret;
}

int chunk_proto_rep_sync(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                         vfm_t *vfm, const fileid_t *parent,
                         int force, const lease_token_t *token,
                         const ec_t *ec, int *oflags)
{
        int ret, idx = 0;
        clockstat_t clockstat;
        int total, online;

        ret = conn_faultdomain(&total, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (online < chkinfo->repnum) {
                DBUG(CHKID_FORMAT"sync, fault domain online %u need %u, skip\n",
                     CHKID_ARG(&chkinfo->id), online, chkinfo->repnum);
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        if (chunk_proto_intact(chkinfo, chkstat)) {
                goto out;
        }

        ANALYSIS_BEGIN(0);

#if ENABLE_CHUNK_DEBUG
        CHKINFO_DUMP(chkinfo, D_INFO);
        DINFO("chunk "CHKID_FORMAT" sync clock %ju\n",
              CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock);
#else
        DBUG("chunk "CHKID_FORMAT" sync clock %ju\n",
             CHKID_ARG(&chkinfo->id), chkstat->chkstat_clock);
#endif

        ret = chunk_proto_rep_check(pool, chkinfo, chkstat, vfm, parent, force, token, ec, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_proto_sync_0");

        ANALYSIS_BEGIN(1);

        // 选择一副本，覆盖其它需要覆盖的副本
        ret = __chunk_get_clean(chkinfo, chkstat, vfm, &idx, &clockstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (clockstat.vclock.clock != chkstat->chkstat_clock) {
#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                      CHKID_ARG(&chkinfo->id), clockstat.vclock.clock,
                      chkstat->chkstat_clock);
#else
                DBUG("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                     CHKID_ARG(&chkinfo->id), clockstat.vclock.clock,
                     chkstat->chkstat_clock);
#endif

                YASSERT(clockstat.vclock.clock < chkstat->chkstat_clock);
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        args_t args;
        args_init(&args, pool, parent, chkinfo, chkstat, NULL, token);

        args.oflags = 0;
        ret = __chunk_sync__(&args, &clockstat, idx);
        if (oflags) {
                *oflags = args.oflags;

                DBUG("pool %s chunk "CHKID_FORMAT" oflags %d\n", pool, CHKID_ARG(&chkinfo->id), *oflags);
        }
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(1, IO_WARN, "chunk_proto_sync_1");

#if ENABLE_CHUNK_DEBUG
        CHKINFO_DUMP(chkinfo, D_INFO);
#endif

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_exist(const chkinfo_t *chkinfo, const nid_t *nid)
{
        int i;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (nid_cmp(&chkinfo->diskid[i].id, nid) == 0) {
                        return i;
                }
        }

        return -1;
}

/**
 * 包含增加新副本，回收旧副本的过程(msgqueue)
 *
 * @param pool
 * @param _chkinfo
 * @param _chkstat
 * @param parent
 * @param nid
 * @param count
 * @param tier
 * @param flags
 * @return
 */
int chunk_proto_rep_move(const char *pool, chkinfo_t *_chkinfo, chkstat_t *_chkstat,
                         vfm_t *vfm, const chkid_t *parent,
                         const nid_t *nid, int count, const lease_token_t *token, const ec_t *ec)
{
        int ret, idx, i;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo_[CHKINFO_MAX], _chkstat_[CHKSTAT_MAX];
        clockstat_t clockstat, clockstat2;
        repstat_t repstat;

        (void) ec;

        if (unlikely(!chunk_proto_consistent(_chkinfo, _chkstat, vfm))) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        if (_chkinfo->diskid[0].status) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = chunk_getclock(&_chkinfo->diskid[0].id, &_chkinfo->id, &clockstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clockstat.vclock.clock != _chkstat->chkstat_clock) {
#if ENABLE_CHUNK_DEBUG
                DINFO("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                      CHKID_ARG(&_chkinfo->id), clockstat.vclock.clock,
                      _chkstat->chkstat_clock);
#else
                DBUG("chunk "CHKID_FORMAT" in writing clock %ju --> %ju\n",
                     CHKID_ARG(&_chkinfo->id), clockstat.vclock.clock,
                     _chkstat->chkstat_clock);
#endif

                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo_;
        chkstat = (void *)_chkstat_;

        *chkinfo = *_chkinfo;
        *chkstat = *_chkstat;

        // 若_chkinfo的副本集合为A，新的副本集合为B，两者交集为C=A^B
        for (i = 0; i < count; i++) {
                idx = __chunk_exist(_chkinfo, &nid[i]);
                if (idx != -1) {
                        // C
                        chkinfo->diskid[i] = _chkinfo->diskid[idx];
                        chkstat->repstat[i] = _chkstat->repstat[idx];
                } else {
                        // 新副本：B-A
                        ret = chunk_push(pool, &_chkinfo->id, &_chkinfo->diskid[0].id,
                                         &nid[i], &clockstat.vclock,
                                         chkinfo->info_version + 1, parent, -1, 0, "move");
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }

                        // 确保推送成功
                        ret = chunk_connect(&nid[i], &_chkinfo->id, parent, 0, token, &clockstat2, &repstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        chkstat->repstat[i] = repstat;

#if 0
                        ret = network_connect(&nid[i], &chkstat->repstat[i].ltime, 1, 0);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
#endif

                        chkinfo->diskid[i].id = nid[i];
                        chkinfo->diskid[i].status = __S_CLEAN;
                }
        }

        chkinfo->repnum = count;
        chkinfo->info_version++;
        CHKINFO_CP(_chkinfo, chkinfo);
        CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

        return 0;
err_ret:
        return ret;
}

struct chunk_proto_ops rep_ops = {
        .name           = "chunk_proto_rep",
        .create         = chunk_proto_rep_create,
        .unlink         = chunk_proto_rep_unlink,
        .sha1           = chunk_proto_rep_sha1,
        .check          = chunk_proto_rep_check,
        .sync           = chunk_proto_rep_sync,
        .move           = chunk_proto_rep_move,
        .read           = chunk_proto_rep_read,
        .write          = chunk_proto_rep_write,
};

int chunk_proto_rep_init()
{
        return chunk_proto_ops_register(&rep_ops, CHUNK_PROTO_REP);
}
